package com.code.ape.codeape.inhos.mq.listener;

import com.code.ape.codeape.common.core.util.AssertUtil;
import com.code.ape.codeape.common.elasticsearch.ResetElasticSearchClient;
import com.code.ape.codeape.inhos.api.entity.constant.MqConstant;
import com.code.ape.codeape.inhos.api.entity.constant.PatInfoConstant;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.rest.RestStatus;
import org.springframework.stereotype.Component;

/**
 * @author 公众号：码猿技术专栏
 * @url: www.java-family.cn
 * @description 处理患者办理住院的消息
 * 踩坑：1. 保证消费幂等
 */
@RocketMQMessageListener(topic = MqConstant.PAT_TOPIC,
		selectorExpression=MqConstant.TAG_HOT,
		consumerGroup = "pat-hot-consumer",
		//并发消费的线程，默认64，这里调整为5
		consumeThreadMax = 5,
		//消费失败的重试次数，这里调整为5次，达到5次后进入死信队列，人工干预
		maxReconsumeTimes=5)
@Slf4j
@Component
@RequiredArgsConstructor
public class PatHotMqListener implements RocketMQListener<String> {


	private final ResetElasticSearchClient resetElasticSearchClient;

	@SneakyThrows
	@Override
	public void onMessage(String message) {
		if (log.isDebugEnabled())
			log.debug("消费端接收到一条入院冷热分离的消息：{}",message);
		//step1 删除ElasticSearch中的数据
		DeleteResponse response = resetElasticSearchClient.delete(PatInfoConstant.PAT_INFO_INDEX_NAME, message);
		AssertUtil.isTrue(response.status().equals(RestStatus.OK),"删除ElasticSearch中患者数据失败");
	}
}
